---
title: "Subscription Service"
description: "Manages app subscriptions to data streams with permission enforcement"
---

## Overview

The Subscription Service is responsible for managing which apps receive which data streams from user sessions. It handles subscription lifecycle, validates permissions, tracks history, and provides efficient queries for message routing.

**File**: `packages/cloud/src/services/session/subscription.service.ts`

## Key Features

1. **In-memory subscription storage** for fast access
2. **Permission validation** for each subscription
3. **Language-specific subscriptions** (e.g., `transcription:en-US`)
4. **Wildcard subscriptions** (`*` or `all`)
5. **Subscription history tracking** for debugging
6. **Calendar event and location caching**
7. **Rate limiting** for location updates

## Data Structures

### Subscription Storage

```typescript
// Active subscriptions keyed by "sessionId:packageName"
private subscriptions = new Map<string, Set<ExtendedStreamType>>();

// History tracking for debugging
private history = new Map<string, SubscriptionHistory[]>();

// Version tracking for concurrent updates
private subscriptionUpdateVersion = new Map<string, number>();
```

### Subscription History

```typescript
interface SubscriptionHistory {
  timestamp: Date;
  subscriptions: ExtendedStreamType[];
  action: "add" | "remove" | "update";
}
```

## Core Operations

### Update Subscriptions

Updates app subscriptions with permission validation and history tracking:

```typescript
async updateSubscriptions(
  userSession: UserSession,
  packageName: string,
  subscriptions: SubscriptionRequest[]
): Promise<UserI | null> {
  const key = this.getKey(userSession.userId, packageName);
  
  // Version tracking for concurrent updates
  const currentVersion = (this.subscriptionUpdateVersion.get(key) || 0) + 1;
  this.subscriptionUpdateVersion.set(key, currentVersion);
  
  // Create new subscription set
  const newSubscriptions = new Set<ExtendedStreamType>();
  
  // Process each subscription request
  for (const subRequest of subscriptions) {
    const streamType = subRequest.type;
    
    // Validate permission for stream type
    const hasPermission = await SimplePermissionChecker.checkPermission(
      packageName, 
      streamType
    );
    
    if (!hasPermission) {
      throw new Error(`Missing permission for stream type: ${streamType}`);
    }
    
    // Add base and language-specific subscriptions
    newSubscriptions.add(streamType);
    
    if (subRequest.config?.languages) {
      for (const lang of subRequest.config.languages) {
        const langStream = createTranscriptionStream(streamType, lang);
        newSubscriptions.add(langStream);
      }
    }
  }
  
  // Store subscriptions
  this.subscriptions.set(key, newSubscriptions);
  
  // Track history
  this.addToHistory(key, Array.from(newSubscriptions), "update");
  
  // Persist location rate to database if needed
  // ... database operations with retry logic
}
```

### Query Subscriptions

#### Get Subscribed Apps

Find all apps subscribed to a specific stream:

```typescript
getSubscribedApps(
  userSession: UserSession,
  subscription: ExtendedStreamType
): string[] {
  const sessionId = userSession.sessionId;
  const subscribedApps: string[] = [];
  
  for (const [key, subs] of this.subscriptions.entries()) {
    if (!key.startsWith(`${sessionId}:`)) continue;
    
    const [, packageName] = key.split(":");
    
    for (const sub of subs) {
      // Check exact match or wildcards
      if (sub === subscription || 
          sub === StreamType.ALL || 
          sub === StreamType.WILDCARD) {
        subscribedApps.push(packageName);
        break;
      }
      
      // Check language-specific matches
      if (isLanguageStream(sub) && isLanguageStream(subscription)) {
        const subInfo = parseLanguageStream(sub);
        const reqInfo = parseLanguageStream(subscription);
        
        if (subInfo?.type === reqInfo?.type && 
            subInfo?.language === reqInfo?.language) {
          subscribedApps.push(packageName);
          break;
        }
      }
    }
  }
  
  return subscribedApps;
}
```

#### Check Media Subscriptions

Determine what types of media processing are needed:

```typescript
hasPCMTranscriptionSubscriptions(sessionId: string): {
  hasMedia: boolean;
  hasPCM: boolean;
  hasTranscription: boolean;
} {
  let hasMedia = false;
  let hasPCM = false;
  let hasTranscription = false;
  
  for (const [key, subs] of this.subscriptions.entries()) {
    if (!key.startsWith(sessionId + ":")) continue;
    
    for (const sub of subs) {
      if (sub === StreamType.AUDIO_CHUNK) {
        hasPCM = true;
        hasMedia = true;
      } else if (sub === StreamType.TRANSLATION || 
                 sub === StreamType.TRANSCRIPTION) {
        hasTranscription = true;
        hasMedia = true;
      } else {
        // Check language-specific transcription/translation
        const langInfo = parseLanguageStream(sub as string);
        if (langInfo && 
            (langInfo.type === StreamType.TRANSLATION || 
             langInfo.type === StreamType.TRANSCRIPTION)) {
          hasTranscription = true;
          hasMedia = true;
        }
      }
    }
  }
  
  return { hasMedia, hasPCM, hasTranscription };
}
```

### Language Subscriptions

Get minimal set of languages needed for transcription:

```typescript
getMinimalLanguageSubscriptions(sessionId: string): string[] {
  const languages = new Set<string>();
  
  for (const [key, subs] of this.subscriptions.entries()) {
    if (!key.startsWith(sessionId + ":")) continue;
    
    for (const sub of subs) {
      const langInfo = parseLanguageStream(sub as string);
      if (langInfo?.language) {
        languages.add(langInfo.language);
      }
    }
  }
  
  return Array.from(languages);
}
```

## Caching Services

### Calendar Events

```typescript
// Cache calendar events per session
cacheCalendarEvent(sessionId: string, event: CalendarEvent): void {
  if (!this.calendarEventsCache.has(sessionId)) {
    this.calendarEventsCache.set(sessionId, []);
  }
  this.calendarEventsCache.get(sessionId)!.push(event);
}

// Retrieve all events
getAllCalendarEvents(sessionId: string): CalendarEvent[] {
  return this.calendarEventsCache.get(sessionId) || [];
}
```

### Location Data

```typescript
// Cache last known location
cacheLocation(sessionId: string, location: Location): void {
  this.lastLocationCache.set(sessionId, location);
}

// Retrieve last location
getLastLocation(sessionId: string): Location | undefined {
  return this.lastLocationCache.get(sessionId);
}
```

## Session Cleanup

Remove all subscriptions and cached data for a session:

```typescript
removeAllSubscriptionsForSession(sessionId: string): void {
  const removedKeys: string[] = [];
  
  // Remove subscriptions
  for (const key of this.subscriptions.keys()) {
    if (key.startsWith(`${sessionId}:`)) {
      this.subscriptions.delete(key);
      removedKeys.push(key);
    }
  }
  
  // Clear caches
  this.calendarEventsCache.delete(sessionId);
  this.lastLocationCache.delete(sessionId);
  
  // Clean up history and version tracking
  for (const key of removedKeys) {
    this.history.delete(key);
    this.subscriptionUpdateVersion.delete(key);
  }
}
```

## Permission Integration

The service integrates with the SimplePermissionChecker to validate each subscription:

```typescript
const hasPermission = await SimplePermissionChecker.checkPermission(
  packageName, 
  streamType
);

if (!hasPermission) {
  throw new Error(`App ${packageName} lacks permission for ${streamType}`);
}
```

## Database Persistence

Location subscription rates are persisted to the User model with retry logic:

```typescript
// Retry logic for database operations
const maxRetries = 3;
for (let attempt = 0; attempt < maxRetries; attempt++) {
  try {
    const user = await User.findOne({ email: userSession.userId });
    
    // Update location subscription rate
    if (locationRate) {
      user.locationSubscriptions.set(sanitizedPackageName, {
        rate: locationRate
      });
    }
    
    user.markModified("locationSubscriptions");
    await user.save();
    break;
  } catch (error) {
    if (error.name === "VersionError" && attempt < maxRetries - 1) {
      // Exponential backoff
      await new Promise(resolve => 
        setTimeout(resolve, Math.pow(2, attempt) * 100)
      );
    } else {
      throw error;
    }
  }
}
```

## Best Practices

1. **Always validate permissions** before accepting subscriptions
2. **Use version tracking** for concurrent update detection
3. **Implement retry logic** for database operations
4. **Clean up session data** to prevent memory leaks
5. **Log subscription changes** for debugging
6. **Sanitize package names** for MongoDB keys

## Related Services

- **[SessionService](/cloud-architecture/services/session-service)**: Uses subscriptions for message routing
- **[SimplePermissionChecker](/cloud-architecture/services/permission-checker)**: Validates subscription permissions
- **[LocationService](/cloud-architecture/services/location-service)**: Handles location update rates
- **[UserSession](/cloud-architecture/session-management/user-session-class)**: Session context for subscriptions